package com.popreal.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import net.vsame.url2sql.helper.SqlHelper;
import net.vsame.url2sql.helper.WebHelper;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import redis.clients.jedis.Jedis;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.popreal.scheduler.helper.BAERedisHelper;
import com.popreal.scheduler.service.ExecutorService;
import com.popreal.scheduler.service.impl.HttpExecutorImpl;
import com.popreal.shop.helper.RedisHelper.RedisCallback;

public class MainExecutor extends Thread {

	private final static Log LOG = LogFactory.getLog(MainExecutor.class);
	public static final String KEY = "scheduler_http";
	protected static final long RE_TRY_SECONDS = 60 * 10; //10 分钟若还未返回，自动重试
	private boolean isStart = true;
	
	@Autowired
	private ThreadPoolTaskExecutor pool;//线程池
	private Map<String, ExecutorService> map = new HashMap<String, ExecutorService>();
	
	public MainExecutor() {
		map.put("HTTP", new HttpExecutorImpl());
	}
	
	public void setMap(Map<String, ExecutorService> map) {
		this.map = map;
	}
	public void destory(){
		isStart = false;
	}
	
	@Override
	public void run() {
		System.out.println("Run Thread!");
		LOG.info("调度器开启");
		while (isStart) {
			try {
				scan();
			}catch(Exception e) {
				LOG.error("扫描器出错", e);
			}
			try {
				Thread.sleep(800);
			} catch (InterruptedException e) {
			}
		}
		LOG.info("调度器销毁");
	}

	private void scan() {
		final Set<String> executeSet = new HashSet<String>();
		BAERedisHelper.use(new RedisCallback<String>() {
			@Override
			public String execute(Jedis redis) {
				long now = System.currentTimeMillis() / 1000;
				Set<String> set = redis.zrangeByScore(KEY, 0, now);
				executeSet.addAll(set);
				
				//更改其 Score 增加自动重试机制
				for(String key : set) {
					redis.zadd(KEY, now + RE_TRY_SECONDS, key);
				}
				return null;
			}
		});
		
		//交给线程池 开始执行
		for(final String json : executeSet) {
			pool.execute(new Runnable() {
				@Override
				public void run() {
					executeThread(json);
				}
			});
		}
		
	}
	
	private void executeThread(final String json) {
		long begin = System.currentTimeMillis();
		final JSONObject o = JSON.parseObject(json);
		
		//执行任务
		String result = null;
		String isSuccess = "Y";
		try {
			result = map.get(o.getString("type")).execute(o, JSON.parseObject(o.getString("config")));
		}catch (Exception e) {
			result = e.getMessage();
			isSuccess = "N";
		}
		
		//重回队列
		BAERedisHelper.use(new RedisCallback<String>() {
			@Override
			public String execute(Jedis redis) {
				long now = System.currentTimeMillis() / 1000;
				int interval = o.getIntValue("interval");
				redis.zadd(KEY, now + interval, json);
				return null;
			}
		});
		
		//保存到 Mysql
		try {
			WebHelper.init(null, null);
			SqlHelper.execute("INSERT INTO `tast_log_t` (`id`, `taskId`, `result`, `duration`, isSuccess) VALUES (${~UUID}, ?, ?, ?, ?)", 
					o.getString("id"), result, System.currentTimeMillis() - begin, isSuccess);
		}catch (Exception e) {
			LOG.error("保存到 Mysql 出错", e);
		}finally {
			WebHelper.remove();
		}
	}
	
	
}
